Skip to content

Column Projection Pushdown: Q1 GROUP BY 16-17x faster#161

Open
poyrazK wants to merge 2 commits into
mainfrom
perf/group-by-optimization-phase3
Open

Column Projection Pushdown: Q1 GROUP BY 16-17x faster#161
poyrazK wants to merge 2 commits into
mainfrom
perf/group-by-optimization-phase3

Conversation

@poyrazK

@poyrazK poyrazK commented Jun 11, 2026

Copy link
Copy Markdown
Owner

Summary

  • Add column projection pushdown to ColumnarTable (read_batch with col_indices overload)
  • VectorizedSeqScanOperator gains set_required_columns() for column subset scanning
  • build_vectorized_plan() propagates required columns from GROUP BY to scan
  • Q1 GROUP BY: 161k -> 2.68M rows/s (16-17x speedup)
  • Gap vs DuckDB closes from 385x to 21x (10k), 1,196x to 64x (100k)

Benchmark Results

Benchmark Before After Change
Q1 GROUP BY 10k 161k/s 2.68M/s +16.6x
Q1 GROUP BY 100k 152k/s 2.67M/s +17.6x

Tests

  • All 39 vectorized_operator_tests pass
  • All 50 cloudSQL_tests pass

Summary by CodeRabbit

Release Notes

  • New Features
    • Vectorized query execution now supports column projection optimization for GROUP BY and aggregate operations. The query engine automatically identifies and reads only necessary table columns, reducing memory overhead and improving query performance.

- Add read_batch(col_indices) overload to ColumnarTable
- Add set_required_columns() to VectorizedSeqScanOperator
- Propagate required column indices from GROUP BY up to scan
- Q1 GROUP BY improves 16-17x (161k -> 2.68M rows/s)
- Gap vs DuckDB closes from 385x to 21x (10k)
- All 89 tests pass
@coderabbitai

coderabbitai Bot commented Jun 11, 2026

Copy link
Copy Markdown

Review Change Stack

📝 Walkthrough

Walkthrough

The PR adds column projection support to vectorized batch reading. Storage declares and implements a new read_batch overload accepting column indices. The vectorized operator adds state for projection and calls the selective-read API in both sequential and parallel paths. The query planner tracks required columns for GROUP BY/aggregate and configures the scan operator accordingly.

Changes

Vectorized Column Projection

Layer / File(s) Summary
Storage layer: Selective column batch reading
include/storage/columnar_table.hpp, src/storage/columnar_table.cpp
New read_batch(start_row, batch_size, out_batch, col_indices) overload reads only specified columns from disk. Deserialization iterates over requested columns, reconstructs values according to logical types, and assumes out_batch is pre-initialized with a reduced schema.
Vectorized operator: Column projection configuration
include/executor/vectorized_operator.hpp
VectorizedSeqScanOperator gains required_col_indices_ and reduced_schema_ state, plus set_required_columns(...) method. Sequential and parallel scan paths conditionally create batches with the reduced schema and call the selective-column read_batch overload when projection is configured.
Plan builder: GROUP BY column requirement tracking
src/executor/query_executor.cpp
QueryExecutor::build_vectorized_plan downcasts to VectorizedSeqScanOperator*, computes base-table columns required by GROUP BY keys and aggregate inputs, and configures the scan operator via set_required_columns(...) with the reduced schema.

🎯 3 (Moderate) | ⏱️ ~22 minutes

Possibly related PRs

  • poyrazK/cloudSQL#59: Complements this PR by wiring GROUP BY/aggregate planning to the new VectorizedSeqScanOperator::set_required_columns(...) API for selective column reading.
  • poyrazK/cloudSQL#157: Builds on the vectorized operator column projection added here to implement DirectIndexAgg GROUP BY paths with selective batch reading.

Poem

🐰 Hopping through columns with care,
We read only what matters down there—
GROUP BY grows light, no excess I/O sight,
Projected batches, perfectly right! 📦✨

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main change—adding column projection pushdown that significantly improves GROUP BY query performance.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch perf/group-by-optimization-phase3

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
include/executor/vectorized_operator.hpp (1)

177-177: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Use consistent schema initialization in parallel path.

Line 177 unconditionally initializes out_batch with output_schema_, but line 147-148 conditionally uses reduced_schema_ when projection is active. If column projection is enabled, this creates batches with mismatched schemas: parallel results have reduced_schema_, but out_batch has output_schema_. While the copy loop (line 178) only iterates src.column_count() and avoids out-of-bounds issues, the extra columns in out_batch are wasteful.

🔧 Proposed fix for consistency
         if (parallel_idx_ < parallel_results_.size()) {
             auto& src = *parallel_results_[parallel_idx_];
-            out_batch.init_from_schema(output_schema_);
+            out_batch.init_from_schema(required_col_indices_.empty() ? output_schema_ 
+                                                                     : reduced_schema_);
             for (size_t c = 0; c < src.column_count(); ++c) {
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@include/executor/vectorized_operator.hpp` at line 177, The unconditional call
to out_batch.init_from_schema(output_schema_) causes schema mismatch when column
projection is active (parallel path uses reduced_schema_); update the
initialization to use reduced_schema_ when projection is enabled. Locate the
code around out_batch.init_from_schema(...) and change it to initialize with
reduced_schema_ if projection is active (same condition used at lines ~147-148),
ensuring out_batch uses output_schema_ only when projection is not active so the
parallel and non-parallel paths produce consistent batch schemas.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@include/executor/vectorized_operator.hpp`:
- Around line 99-102: set_required_columns currently updates
required_col_indices_ and reduced_schema_ but does not update output_schema_,
causing downstream operators to resolve indices against the full schema while
batches use reduced_schema_; update set_required_columns (the method named
set_required_columns) to also set output_schema_ = reduced_schema_ (or otherwise
replace the operator's output_schema_ with reduced_schema_) so output_schema()
matches the projected batch schema; ensure you modify the same function that
assigns required_col_indices_ and reduced_schema_ and keep reduced_schema_ and
required_col_indices_ semantics intact.

In `@src/executor/query_executor.cpp`:
- Around line 1724-1744: The code builds output_schema (GROUP BY keys +
aggregate result types) and passes it to base_scan->set_required_columns,
causing the scan to initialize output columns with aggregate result types and
crash when storage expects the original input types; instead build a
reduced_schema containing the actual input column types for every index in
required_col_indices (use current_root->output_schema().get_column(idx).type()
for group-by key indices and, for agg_infos entries with input_col_idx >=0, use
the table/input column type rather than the aggregate result type) and pass that
reduced_schema to base_scan->set_required_columns; update references around
output_schema, required_col_indices, agg_infos, and the call
base_scan->set_required_columns to use this corrected schema so
ColumnarTable::read_batch sees matching types.

In `@src/storage/columnar_table.cpp`:
- Around line 310-470: The duplicated per-column deserialization logic in
ColumnarTable::read_batch should be extracted into a private helper (suggested
signature: bool deserialize_column(size_t col_idx, uint64_t start_row, uint32_t
actual_rows, executor::ColumnVector& target_col)) that encapsulates opening
nulls/data files (using storage_manager_ and name_), seeking, reading nulls and
data, and appending values for all supported common::ValueType cases; update
both read_batch overloads to call deserialize_column for each col_idx and
propagate its bool result (leave out_batch.set_row_count/return handling in the
callers). Ensure the helper references schema_.get_column(col_idx).type() and
reuses the same deserialization branches (INT64, INT32/16/8,
FLOAT64/FLOAT32/DECIMAL, BOOL, TEXT) so all file IO and value conversion is
centralized.

---

Outside diff comments:
In `@include/executor/vectorized_operator.hpp`:
- Line 177: The unconditional call to out_batch.init_from_schema(output_schema_)
causes schema mismatch when column projection is active (parallel path uses
reduced_schema_); update the initialization to use reduced_schema_ when
projection is enabled. Locate the code around out_batch.init_from_schema(...)
and change it to initialize with reduced_schema_ if projection is active (same
condition used at lines ~147-148), ensuring out_batch uses output_schema_ only
when projection is not active so the parallel and non-parallel paths produce
consistent batch schemas.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 61f2d892-f533-4c37-ba0d-17316e8169e1

📥 Commits

Reviewing files that changed from the base of the PR and between e46a1f6 and 9747dd4.

📒 Files selected for processing (4)
  • include/executor/vectorized_operator.hpp
  • include/storage/columnar_table.hpp
  • src/executor/query_executor.cpp
  • src/storage/columnar_table.cpp

Comment on lines +99 to +102
void set_required_columns(std::vector<size_t> col_indices, executor::Schema reduced_schema) {
required_col_indices_ = std::move(col_indices);
reduced_schema_ = std::move(reduced_schema);
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

output_schema_ must be updated when column projection is enabled.

When set_required_columns is called, the operator will produce batches with reduced_schema, but output_schema_ (returned by output_schema()) still reflects the full table schema (set at line 82). Downstream operators resolve column indices from child_->output_schema(), then access batch columns at those indices. If the indices refer to the full schema but the batch has a reduced schema, column accesses will retrieve wrong data or crash from out-of-bounds access.

For example:

  • Table schema: [id INT, cat TEXT, val INT]
  • Projection: columns [cat, val] → indices [1, 2] in full schema
  • Batch schema after projection: [cat, val] → positions [0, 1]
  • GROUP BY resolves "cat" → index 1 (from full schema)
  • batch.get_column(1) retrieves "val" (batch position 1) instead of "cat" → wrong result
🐛 Proposed fix
 void set_required_columns(std::vector<size_t> col_indices, executor::Schema reduced_schema) {
     required_col_indices_ = std::move(col_indices);
     reduced_schema_ = std::move(reduced_schema);
+    output_schema_ = reduced_schema_;  // Update output schema to match actual batch schema
 }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@include/executor/vectorized_operator.hpp` around lines 99 - 102,
set_required_columns currently updates required_col_indices_ and reduced_schema_
but does not update output_schema_, causing downstream operators to resolve
indices against the full schema while batches use reduced_schema_; update
set_required_columns (the method named set_required_columns) to also set
output_schema_ = reduced_schema_ (or otherwise replace the operator's
output_schema_ with reduced_schema_) so output_schema() matches the projected
batch schema; ensure you modify the same function that assigns
required_col_indices_ and reduced_schema_ and keep reduced_schema_ and
required_col_indices_ semantics intact.

Comment on lines 1724 to +1744
executor::Schema output_schema;
std::vector<size_t> required_col_indices;
for (const auto& gb : stmt.group_by()) {
const auto& gb_name = gb->to_string();
size_t idx = current_root->output_schema().find_column(gb_name);
if (idx != static_cast<size_t>(-1)) {
output_schema.add_column(current_root->output_schema().get_column(idx).name(),
current_root->output_schema().get_column(idx).type(),
current_root->output_schema().get_column(idx).nullable());
required_col_indices.push_back(idx);
}
}
for (size_t i = 0; i < agg_infos.size(); ++i) {
output_schema.add_column("agg_" + std::to_string(i), common::ValueType::TYPE_FLOAT64,
false);
if (agg_infos[i].input_col_idx >= 0) {
required_col_indices.push_back(static_cast<size_t>(agg_infos[i].input_col_idx));
}
}

base_scan->set_required_columns(required_col_indices, output_schema);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

Wrong schema passed to set_required_columns — causes type mismatch and crashes.

The output_schema built at lines 1724-1742 is the GROUP BY operator's output schema (keys + aggregate results), not the scan operator's output schema (keys + aggregate inputs). When passed to set_required_columns at line 1744, this causes the scan to initialize out_batch with aggregate result types (e.g., FLOAT64 for SUM) instead of input types (e.g., INT for the SUM input column).

Then in ColumnarTable::read_batch, the code reads the table column type from the table schema (INT), attempts to dynamic_cast the out_batch column to the corresponding vector type (NumericVector<int64_t>), but the column was initialized as FLOAT64 (NumericVector<double>). The cast throws std::bad_cast, crashing the query.

Example:

  • Table: (cat TEXT, val INT)
  • Query: SELECT cat, SUM(val) FROM test_table GROUP BY cat
  • output_schema at line 1744: (cat TEXT, agg_0 FLOAT64) — includes SUM output type
  • required_col_indices: [0, 1] — cat and val
  • Scan creates batch with schema (cat TEXT, agg_0 FLOAT64)
  • Storage tries to deserialize val (INT) into batch column 1 (FLOAT64 vector) → crash
🐛 Proposed fix

Build reduced_schema from the required input columns instead of the GROUP BY output schema:

         executor::Schema output_schema;
         std::vector<size_t> required_col_indices;
+        executor::Schema reduced_input_schema;  // Schema of columns scanned from table
         for (const auto& gb : stmt.group_by()) {
             const auto& gb_name = gb->to_string();
             size_t idx = current_root->output_schema().find_column(gb_name);
             if (idx != static_cast<size_t>(-1)) {
+                const auto& col = current_root->output_schema().get_column(idx);
+                reduced_input_schema.add_column(col.name(), col.type(), col.nullable());
                 output_schema.add_column(current_root->output_schema().get_column(idx).name(),
                                          current_root->output_schema().get_column(idx).type(),
                                          current_root->output_schema().get_column(idx).nullable());
                 required_col_indices.push_back(idx);
             }
         }
         for (size_t i = 0; i < agg_infos.size(); ++i) {
             output_schema.add_column("agg_" + std::to_string(i), common::ValueType::TYPE_FLOAT64,
                                      false);
             if (agg_infos[i].input_col_idx >= 0) {
+                size_t input_idx = static_cast<size_t>(agg_infos[i].input_col_idx);
+                const auto& col = current_root->output_schema().get_column(input_idx);
+                reduced_input_schema.add_column(col.name(), col.type(), col.nullable());
                 required_col_indices.push_back(static_cast<size_t>(agg_infos[i].input_col_idx));
             }
         }

-        base_scan->set_required_columns(required_col_indices, output_schema);
+        base_scan->set_required_columns(required_col_indices, reduced_input_schema);
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
executor::Schema output_schema;
std::vector<size_t> required_col_indices;
for (const auto& gb : stmt.group_by()) {
const auto& gb_name = gb->to_string();
size_t idx = current_root->output_schema().find_column(gb_name);
if (idx != static_cast<size_t>(-1)) {
output_schema.add_column(current_root->output_schema().get_column(idx).name(),
current_root->output_schema().get_column(idx).type(),
current_root->output_schema().get_column(idx).nullable());
required_col_indices.push_back(idx);
}
}
for (size_t i = 0; i < agg_infos.size(); ++i) {
output_schema.add_column("agg_" + std::to_string(i), common::ValueType::TYPE_FLOAT64,
false);
if (agg_infos[i].input_col_idx >= 0) {
required_col_indices.push_back(static_cast<size_t>(agg_infos[i].input_col_idx));
}
}
base_scan->set_required_columns(required_col_indices, output_schema);
executor::Schema output_schema;
std::vector<size_t> required_col_indices;
executor::Schema reduced_input_schema; // Schema of columns scanned from table
for (const auto& gb : stmt.group_by()) {
const auto& gb_name = gb->to_string();
size_t idx = current_root->output_schema().find_column(gb_name);
if (idx != static_cast<size_t>(-1)) {
const auto& col = current_root->output_schema().get_column(idx);
reduced_input_schema.add_column(col.name(), col.type(), col.nullable());
output_schema.add_column(current_root->output_schema().get_column(idx).name(),
current_root->output_schema().get_column(idx).type(),
current_root->output_schema().get_column(idx).nullable());
required_col_indices.push_back(idx);
}
}
for (size_t i = 0; i < agg_infos.size(); ++i) {
output_schema.add_column("agg_" + std::to_string(i), common::ValueType::TYPE_FLOAT64,
false);
if (agg_infos[i].input_col_idx >= 0) {
size_t input_idx = static_cast<size_t>(agg_infos[i].input_col_idx);
const auto& col = current_root->output_schema().get_column(input_idx);
reduced_input_schema.add_column(col.name(), col.type(), col.nullable());
required_col_indices.push_back(static_cast<size_t>(agg_infos[i].input_col_idx));
}
}
base_scan->set_required_columns(required_col_indices, reduced_input_schema);
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/executor/query_executor.cpp` around lines 1724 - 1744, The code builds
output_schema (GROUP BY keys + aggregate result types) and passes it to
base_scan->set_required_columns, causing the scan to initialize output columns
with aggregate result types and crash when storage expects the original input
types; instead build a reduced_schema containing the actual input column types
for every index in required_col_indices (use
current_root->output_schema().get_column(idx).type() for group-by key indices
and, for agg_infos entries with input_col_idx >=0, use the table/input column
type rather than the aggregate result type) and pass that reduced_schema to
base_scan->set_required_columns; update references around output_schema,
required_col_indices, agg_infos, and the call base_scan->set_required_columns to
use this corrected schema so ColumnarTable::read_batch sees matching types.

Comment on lines +310 to +470
for (size_t idx = 0; idx < col_indices.size(); ++idx) {
size_t col_idx = col_indices[idx];
const std::string base = name_ + ".col" + std::to_string(col_idx);
std::ifstream n_in(storage_manager_.get_full_path(base + ".nulls.bin"), std::ios::binary);
std::ifstream d_in(storage_manager_.get_full_path(base + ".data.bin"), std::ios::binary);
if (!n_in.is_open() || !d_in.is_open()) return false;

auto& target_col = out_batch.get_column(idx);
const auto type = schema_.get_column(col_idx).type();

if (type == common::ValueType::TYPE_INT64) {
auto& num_vec = dynamic_cast<executor::NumericVector<int64_t>&>(target_col);

n_in.seekg(static_cast<std::streamoff>(start_row), std::ios::beg);
std::vector<uint8_t> nulls(actual_rows);
n_in.read(reinterpret_cast<char*>(nulls.data()), actual_rows);

d_in.seekg(static_cast<std::streamoff>(start_row * 8), std::ios::beg);
std::vector<int64_t> data(actual_rows);
d_in.read(reinterpret_cast<char*>(data.data()), actual_rows * 8);

for (uint32_t r = 0; r < actual_rows; ++r) {
if (nulls[r] != 0U) {
num_vec.append(common::Value::make_null());
} else {
num_vec.append(common::Value::make_int64(data[r]));
}
}
} else if (type == common::ValueType::TYPE_INT32 || type == common::ValueType::TYPE_INT16 ||
type == common::ValueType::TYPE_INT8) {
auto& num_vec = dynamic_cast<executor::NumericVector<int64_t>&>(target_col);

n_in.seekg(static_cast<std::streamoff>(start_row), std::ios::beg);
std::vector<uint8_t> nulls(actual_rows);
n_in.read(reinterpret_cast<char*>(nulls.data()), actual_rows);

d_in.seekg(static_cast<std::streamoff>(start_row * 8), std::ios::beg);
std::vector<int64_t> data(actual_rows);
d_in.read(reinterpret_cast<char*>(data.data()), actual_rows * 8);

for (uint32_t r = 0; r < actual_rows; ++r) {
if (nulls[r] != 0U) {
num_vec.append(common::Value::make_null());
} else if (type == common::ValueType::TYPE_INT32) {
num_vec.append(common::Value(static_cast<int32_t>(data[r])));
} else if (type == common::ValueType::TYPE_INT16) {
num_vec.append(common::Value(static_cast<int16_t>(data[r])));
} else {
num_vec.append(common::Value(static_cast<int8_t>(data[r])));
}
}
} else if (type == common::ValueType::TYPE_FLOAT64) {
auto& num_vec = dynamic_cast<executor::NumericVector<double>&>(target_col);

n_in.seekg(static_cast<std::streamoff>(start_row), std::ios::beg);
std::vector<uint8_t> nulls(actual_rows);
n_in.read(reinterpret_cast<char*>(nulls.data()), actual_rows);

d_in.seekg(static_cast<std::streamoff>(start_row * 8), std::ios::beg);
std::vector<double> data(actual_rows);
d_in.read(reinterpret_cast<char*>(data.data()), actual_rows * 8);

for (uint32_t r = 0; r < actual_rows; ++r) {
if (nulls[r] != 0U) {
num_vec.append(common::Value::make_null());
} else {
num_vec.append(common::Value::make_float64(data[r]));
}
}
} else if (type == common::ValueType::TYPE_FLOAT32) {
auto& num_vec = dynamic_cast<executor::NumericVector<float>&>(target_col);

n_in.seekg(static_cast<std::streamoff>(start_row), std::ios::beg);
std::vector<uint8_t> nulls(actual_rows);
n_in.read(reinterpret_cast<char*>(nulls.data()), actual_rows);

d_in.seekg(static_cast<std::streamoff>(start_row * 8), std::ios::beg);
std::vector<double> data(actual_rows);
d_in.read(reinterpret_cast<char*>(data.data()), actual_rows * 8);

for (uint32_t r = 0; r < actual_rows; ++r) {
if (nulls[r] != 0U) {
num_vec.append(common::Value::make_null());
} else {
num_vec.append(common::Value(static_cast<float>(data[r])));
}
}
} else if (type == common::ValueType::TYPE_DECIMAL) {
auto& num_vec = dynamic_cast<executor::NumericVector<double>&>(target_col);

n_in.seekg(static_cast<std::streamoff>(start_row), std::ios::beg);
std::vector<uint8_t> nulls(actual_rows);
n_in.read(reinterpret_cast<char*>(nulls.data()), actual_rows);

d_in.seekg(static_cast<std::streamoff>(start_row * 8), std::ios::beg);
std::vector<double> data(actual_rows);
d_in.read(reinterpret_cast<char*>(data.data()), actual_rows * 8);

for (uint32_t r = 0; r < actual_rows; ++r) {
if (nulls[r] != 0U) {
num_vec.append(common::Value::make_null());
} else {
num_vec.append(common::Value::make_float64(data[r]));
}
}
} else if (type == common::ValueType::TYPE_BOOL) {
auto& num_vec = dynamic_cast<executor::NumericVector<bool>&>(target_col);

n_in.seekg(static_cast<std::streamoff>(start_row), std::ios::beg);
std::vector<uint8_t> nulls(actual_rows);
n_in.read(reinterpret_cast<char*>(nulls.data()), actual_rows);

d_in.seekg(static_cast<std::streamoff>(start_row), std::ios::beg);
std::vector<uint8_t> data(actual_rows);
d_in.read(reinterpret_cast<char*>(data.data()), actual_rows);

for (uint32_t r = 0; r < actual_rows; ++r) {
if (nulls[r] != 0U) {
num_vec.append(common::Value::make_null());
} else {
num_vec.append(common::Value(data[r] != 0));
}
}
} else if (type == common::ValueType::TYPE_TEXT ||
type == common::ValueType::TYPE_VARCHAR ||
type == common::ValueType::TYPE_CHAR) {
auto& str_vec = dynamic_cast<executor::StringVector&>(target_col);

n_in.seekg(static_cast<std::streamoff>(start_row), std::ios::beg);
std::vector<uint8_t> nulls(actual_rows);
n_in.read(reinterpret_cast<char*>(nulls.data()), actual_rows);

if (start_row > 0) {
for (uint32_t r = 0; r < start_row; ++r) {
uint32_t len = 0;
if (!d_in.read(reinterpret_cast<char*>(&len), 4)) break;
if (len > 0) {
d_in.seekg(static_cast<std::streamoff>(len), std::ios::cur);
}
}
}

for (uint32_t r = 0; r < actual_rows; ++r) {
uint32_t len = 0;
d_in.read(reinterpret_cast<char*>(&len), 4);
std::string s(len, '\0');
d_in.read(s.data(), len);
if (nulls[r] != 0U) {
str_vec.append(common::Value::make_null());
} else {
str_vec.append(common::Value::make_text(s));
}
}
} else {
throw std::runtime_error("ColumnarTable::read_batch(col_indices): Unsupported type " +
std::to_string(static_cast<int>(type)));
}
}
out_batch.set_row_count(actual_rows);
return true;
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win

Consider extracting shared deserialization logic into a helper method.

The type-dispatch deserialization logic (lines 320-466) is nearly identical to the original read_batch implementation (lines 141-292). This ~150-line duplication creates maintenance burden: any type support changes or bug fixes must be applied in both places, risking divergence.

♻️ Suggested refactoring approach

Extract a private helper method:

private:
    // Helper: deserialize a single column from disk into a vector
    bool deserialize_column(size_t col_idx, uint64_t start_row, uint32_t actual_rows,
                           executor::ColumnVector& target_col);

Then both read_batch overloads can call this helper in their loops, eliminating duplication.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/storage/columnar_table.cpp` around lines 310 - 470, The duplicated
per-column deserialization logic in ColumnarTable::read_batch should be
extracted into a private helper (suggested signature: bool
deserialize_column(size_t col_idx, uint64_t start_row, uint32_t actual_rows,
executor::ColumnVector& target_col)) that encapsulates opening nulls/data files
(using storage_manager_ and name_), seeking, reading nulls and data, and
appending values for all supported common::ValueType cases; update both
read_batch overloads to call deserialize_column for each col_idx and propagate
its bool result (leave out_batch.set_row_count/return handling in the callers).
Ensure the helper references schema_.get_column(col_idx).type() and reuses the
same deserialization branches (INT64, INT32/16/8, FLOAT64/FLOAT32/DECIMAL, BOOL,
TEXT) so all file IO and value conversion is centralized.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant